package sinktest

import Source.SourceTest.{MySensorSource, SensorReading}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

import java.sql.{Connection, DriverManager, PreparedStatement}

object JdbcSinkTest {

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val inputPath: String = "E:\\Flinklearn\\Flink\\resources\\data1\\sensor.txt"
    val inputDataStream: DataStream[String] = env.readTextFile(inputPath)

    // 转换成样例类类型
    val datastream = inputDataStream.map(
      data => {
        val fields: Array[String] = data.split(",")
        SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
      })

//    ds1.print()
    val stream: DataStream[SensorReading] = env.addSource(new MySensorSource())

    stream.addSink(new MyJdbcSinkFunc())


    env.execute()

  }

}

class MyJdbcSinkFunc() extends RichSinkFunction[SensorReading]{

  // 定义连接、预编译语句
  var conn:Connection=_
  var insertStmt:PreparedStatement =_
  var updateStmt:PreparedStatement =_

  override def open(parameters: Configuration): Unit = {

    Class.forName("com.mysql.cj.jdbc.Driver")
    conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/bjpowernode?characterEncoding=utf8&useSSL=true&serverTimezone=UTC","root","123456")
    insertStmt = conn.prepareStatement("insert into sensor_temp(id,temp) value (?,?)")
    updateStmt = conn.prepareStatement("update sensor_temp set temp = ? where id = ?")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {

    // 先执行更新操作，查到就更新
    updateStmt.setDouble(1,value.temperature)
    updateStmt.setString(2,value.id)
    updateStmt.execute()

    // 如果更新没有查到数据，那么就插入
    if (updateStmt.getUpdateCount==0){

      insertStmt.setString(1,value.id)
      insertStmt.setDouble(2,value.temperature)
      insertStmt.execute()
    }
  }

  override def close(): Unit = {

    // 将所有预编译语句关掉 将连接关掉
    insertStmt.close()
    updateStmt.close()
    conn.close()

  }
}

